94c6c7dd1cc0374fcbb06fbbc1e73df28f625db1,cygnus-ngsi/src/main/java/com/telefonica/iot/cygnus/sinks/NGSISink.java,Accumulator,accumulateByServicePath,#Map#NotifyContextRequest#,755
Before Change
NGSIEvent cygnusEvent = new NGSIEvent(
recvTimeTs, service, notifiedServicePaths[i], null, null,
notification.getContextResponses().get(i).getContextElement());
batch.addEvent(destination, cygnusEvent);
} // for
} else {
String[] groupedServicePaths = headers.get(NGSIConstants.FLUME_HEADER_GROUPED_SERVICE_PATHS).split(",");
After Change
batch.addEvent(destination, event);
} // accumulateByService
private void accumulateByServicePath(NGSIEvent event) {
Map<String, String> headers = event.getHeaders();
ContextElement mappedCE = event.getMappedCE();
String destination;
if (mappedCE == null) { // 'TODO': remove when Grouping Rules are definitely removed
String service = headers.get(CommonConstants.HEADER_FIWARE_SERVICE);
if (enableGrouping) {
destination = service + "_" + headers.get(NGSIConstants.FLUME_HEADER_GROUPED_SERVICE_PATH);
} else {
destination = service + "_" + headers.get(CommonConstants.HEADER_FIWARE_SERVICE_PATH);
} // if else
} else {
if (enableNameMappings) {
destination = headers.get(NGSIConstants.FLUME_HEADER_MAPPED_SERVICE) + "_"
+ headers.get(NGSIConstants.FLUME_HEADER_MAPPED_SERVICE_PATH);
} else {
destination = headers.get(CommonConstants.HEADER_FIWARE_SERVICE) + "_"
+ headers.get(CommonConstants.HEADER_FIWARE_SERVICE_PATH);
} // if else
} // if else
batch.addEvent(destination, event);
} // accumulateByServicePath
private void accumulateByEntity(NGSIEvent event) {